package com.flink.examples.mysql;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Description 将mysql表中数据查询输出到DataStream流中
 * @Author JL
 * @Date 2020/09/17
 * @Version V1.0
 */
public class DataStreamSource {

    /**
     * 官方文档：https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html
     */

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //查询sql
        String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user";

        //设置表视图字段与类型
        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .field("sex", DataTypes.INT())
                .field("address", DataTypes.STRING())
                //.field("createTime", DataTypes.TIMESTAMP())
                .field("createTimeSeries", DataTypes.BIGINT())
                .build();

        //配置jdbc数据源选项
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setTableName("t_user")
                .build();
        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();

        //将数据源注册到tableEnv视图student中
        tEnv.registerTableSource("t_user", jdbcTableSource);
        Table table = tEnv.sqlQuery(sql);

        DataStream<TUser> sourceStream = tEnv.toAppendStream(table, TUser.class);
        sourceStream.map((t)->new Gson().toJson(t)).print();

        env.execute("flink mysql source");
    }

}
